${\bf \large \cal Hovhannes \ Grigoryan}\\ {\rm \small New \ York, \ NY}$
We describe the theory and the algorithm behind the linear Kalman filter, and write a simple implementation for it. We also implement adaptive Kalman filter that modifies observation covariance matrix based on recent measurements, and asynchronous Kalman filter that processes observations at different rates. We demonstrate applications of Kalman filter for 1D and 2D projectile motions, GPS signal filtering, stock return prediction. Finally, we describe how Kalman filter can be used to perform dynamic linear regression.
- The Linear State Space Model
- The Linear Kalman Filter
- Projectile Motion 1D
- Projectile Motion 2D
- Kalman Filter for Asynchronous Measurements
- Applications to GPS Signals
- Applications to Stock Returns
- Using Kalman Filter for Linear Regression
%%html
<style>
body, p, div.rendered_html {
color: black;
font-family: "Times Roman", sans-serif;
font-size: 12pt;
}
body {
background-color: #B8BAC3;
}
</style>
import sys, os
import warnings
sys.path.append(os.path.abspath(os.path.join('..')))
warnings.filterwarnings('ignore')
import random
import numpy as np
import matplotlib.pyplot as plt
from utils.html_converter import html_converter
%precision 20
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.rcParams['figure.figsize'] = (14,8)
plt.style.use('fivethirtyeight')
The main ingredients of a discrete, linear, time-varying (Gaussian-Markov, standard second-order) state space model are:
Here is the linear state-space system
Matrices $A, Q, G, R$ determine the probability distributions of $\{x_t\}$ and $\{y_t\}$, while $x_0$ and $w_t/v_t$ pin down the values of these sequences.
The purpose of filtering is to extract the required information from a signal, ignoring everything else. More quantitatively, the goal of a filter is to find a state estimate, $\hat{x}_t$, that maximizes the probability or likelihood of measurement $y_t$, that is $\max[\Pi_t p(y_t|\hat{x}_t)]$. Kalman filter (Kalman '60) is a solution to the Bayesian filtering equations, where the dynamic and measurement models are linear Gaussian.
Consider Markovian dynamics for the state variable $x_t$ satisfying the following linear state equation:
$$x_{t} = A_t x_{t-1} + B_t u_t + w_{t}, \ \ \text{where} \ \ w_t \sim N(0, Q) ,$$$A_t$ is the state transition matrix, $B_t$ is the control-input matrix, $u_t$ is the control vector, and $w_t$ is the process noise. We also assume Gaussian prior for the state variable:
$$p(x_t) = N(x_t; \hat{x}_t, \Sigma_t), \ \ \text{where} \ \ \Sigma_t = \mathbb{E}[(x_t - \hat{x}_t )(x_t - \hat{x}_t)^T].$$In linear Gaussian Kalman process, the observation is described by the following measurement equation:
$$y_t = G_t x_t + v_t, \ \ \text{where} \ \ v_t \sim N(0, R),$$$y_t$ is the measurement vector, $G_t$ is the observation matrix, $v_t$ is the observation noise.
Assume $\hat{x}^{-}_t$ is the a priori estimate with knowledge about the process prior time $t$, and $\hat{x}_t$ is the a posteriori state estimate including the measurement $y_t$. The update equation can be written in the following general form:
$$ \hat{x}_t = \hat{x}^{-}_t + K_t (y_t - G \hat{x}^{-}_t), $$where $K_t$ is the optimal Kalman gain at time $t$. The form of $K_t$ can be derived by minimizing the the mean square error (MSE).
To derive the Kalman gain, we rewrite the above equation as:
$$ \hat{x}_t = \hat{x}^{-}_t + K_t (G x_t + v_t - G \hat{x}^{-}_t), $$and taking into account that $\Sigma_t = \mathbb{E}[(x_t - \hat{x}_t )(x_t - \hat{x}_t)^T]$, we will get the error covariance update equation:
$$ \Sigma_t = (\mathbb{1} - K_t G)\Sigma^-_t (\mathbb{1} - K_t G)^T + K_t R K_t^T. $$Since the trace of the error covariance matrix is the sum of the MSE, to minimize MSE, we need to minimize $\text{Tr}(\Sigma_t)$. More explicitly, the trace can be written as:
$$ \text{Tr}(\Sigma_t) = \text{Tr}(\Sigma_t^-) - 2\text{Tr}(K_t G \Sigma_t^-) + \text{Tr}(K_t(G \Sigma_t^- G^T + R)K_t^T).$$Differentiating w.r.t. $K_t$, and equating to zero, we get:
$$ d\text{Tr}(\Sigma_t)/d K_t = - 2 \Sigma_t^-G^T + 2 K_t(G \Sigma_t^- G^T + R) = 0 $$Therefore, solving for $K_t$, we get:
$$K_t = \Sigma^-_t G^T (G \Sigma^-_t G^T + R)^{-1}.$$This is not the only way to get $K_t$. The expression for the Kalman gain emerges quite naturally when we work with distributions, and use the properties of conditional multivariate Gaussian distributions.
Time Update (prediction): Find the estimate and the error covariance at time $t$, given the state and error covariance at $t-1$:
These are clearly the moments of a predicted Gaussian distribution, $p(x_{t} | y_{1:t-1})$, given that $x_{t-1} \sim N(\hat{x}_{t-1}, \Sigma_{t-1})$.
Measurement Update (correction): Given the measurement at time $t$, update the state $\hat{x}^{-}_t$ and $\Sigma^-_t$,
The corrected moments are obtained using Bayes' rule, from the update step of Bayesian Filtering:
$$p(x_{t}|y_{1:t}) = \frac{p(y_t | x_{t}) p(x_{t} | y_{1:t-1})}{\int p(y_t | x_t) p(x_t | y_{1:t-1}) ~\text{d}x_{t}} $$and measurement model,
$$y_t = G_t x_t \sim p(y_t|x_t) = N(y_t; G_t \hat{x}^-_t, R_t).$$Since we assumed Markovian dynamics,
$$ p(x_{t} | y_{1:t-1}) = p(x_{t}^-) = N(x_{t}^-; \hat{x}_{t}^-, \Sigma_{t}^-)$$is a prior distribution, while
$$p(x_{t}|y_{1:t})=p(x_t|y_t) = N(x_t; \hat{x}_{t}, \Sigma_{t})$$is a posterior distribution. Since, the denominator in a Bayes' rule is essentially a normalizing constant, and the product of Gaussian distributions is also a Gaussian, we arrive to the above prescription for the "correction" step.
Forecasting/Time Update/Prediction Step: In this step, we want to find the predictive distribution
$$p(x_{t| t-1}) = N(x_{t}; \hat{x}_{t|t-1}, \Sigma_{t|t-1})$$From the dynamics of state space variable $x_t$, we have:
$$\hat{x}_{t | t-1} = A_t \hat{x}_{t-1|t-1} + B_t u_t$$$$\Sigma_{t| t-1} = A_t \Sigma_{t-1| t-1} A^T_t + Q_t $$Filtering/Measurement Update/Correction Step: In this step, we update the prior after observing the data, $y_t$,
$$p(x_{t | t}) = N(x_{t | t}; \hat{x}_{t | t}, \Sigma_{t | t})$$$$K_t = \Sigma_{t| t-1} G^T_t (G_t \Sigma_{t| t-1} G^T_t + R_t)^{-1}$$$$\hat{x}_{t|t} = \hat{x}_{t|t-1} + K_t (y_t - G_t\hat{x}_{t|t-1}) $$$$\Sigma_{t | t} = (\mathbb{1} - K_t G_t) \Sigma_{t|t-1} $$where $\hat{x}_{t|t}$ is the a posteriori state estimate at time $t$ given observations up to and including at time $t$, and $\Sigma_{t|t}$ is the a posteriori error covariance matrix (a measure of the estimated accuracy of the state estimate). Also, $y_t - G_t\hat{x}_{t|t-1}$ is called innovation residual, and $G_t \Sigma_{t|t-1} G^T_t + R_t$ is innovation covariance.
If the state and observation noise processes are not stationary then we need to infer the variances. This can be done by using, e.g., causal re-estimation method (Jazwinski), or effective but acausal EM based methods, or variational Bayes approach.
class PlainKalmanFilter(object):
""" Kalman Filter.
> x_{t} = A x_{t-1} + B_{t} u_{t} + w_{t}, where w_t ~ N(0, Q)
> y_t = G x_t + v_t, where v_t ~ N(0, R)
params::
x: [n_states, 1] initial state mean
Sigma: [n_states, n_states] initial state covariance matrix
A: [n_states, n_states] state transition matrix
Q: [n_states, n_states] transition covariance matrix
B: [n_states, n_u] control matrix
u: [n_u, 1] control vector
y: [n_obs, 1] (single) observation
G: [n_obs, n_states] measurement/observation matrix
R: [n_obs, n_obs] observation covariance matrix
"""
def __init__(self, x, Sigma, A, G, Q, R, B=None, u=None):
self.x = x
self.Sigma = Sigma
self.A = A
self.G = G
self.Q = Q
self.R = R
self.B = B
self.u = u
self.n_states = None
self.n_obs = None
self.offset = None
self._initialize()
def _initialize(self):
n = self.A.shape[0]
k = self.G.shape[0]
self.n_states = n
self.n_obs = k
assert (self.x.shape == (n, 1))
assert (self.Sigma.shape == (n, n))
assert (self.A.shape[1] == n)
assert (self.G.shape[1] == n)
assert (self.Q.shape == (n, n))
assert (self.R.shape == (k, k))
if self.B is not None and self.u is not None:
self.offset = np.dot(self.B, self.u)
assert(self.offset.shape == (n, 1))
def predict_next_state(self, x, Sigma):
""" Predicts next prior moments (x, Sigma) based on previous moments."""
assert (x.shape == (self.n_states, 1))
assert (Sigma.shape == (self.n_states, self.n_states))
if self.offset is not None:
xp = np.dot(self.A, x) + self.offset
else:
xp = np.dot(self.A, x)
Sigmap = np.dot(self.A, np.dot(Sigma, self.A.T)) + self.Q
return xp, Sigmap
def predict_observation(self, x, Sigma):
""" Finds E[y], Cov[y], given (x, Sigma)"""
assert (x.shape == (self.n_states, 1))
assert (Sigma.shape == (self.n_states, self.n_states))
obs_mean = np.dot(self.G, x)
obs_cov = np.dot(self.G, np.dot(Sigma, self.G.T)) + self.R
return obs_mean, obs_cov
def filter_state(self, y, x, Sigma):
""" Updates/filters (x, Sigma) to posterior moments based on a current single measurement y_t."""
assert (y.shape == (self.n_obs, 1))
assert (x.shape == (self.n_states, 1))
assert (Sigma.shape == (self.n_states, self.n_states))
z = y - np.dot(self.G, x) # innovation or observation residual
S = np.dot(self.G, np.dot(Sigma, self.G.T)) + self.R # innovation or residual covariance
SI = np.linalg.pinv(S)
K = np.dot(Sigma, np.dot(self.G.T, SI)) # optimal Kalman gain
x_post = x + np.dot(K, z) # updated (a posteriori) state estimate
Sigma_post = Sigma - np.dot(K, np.dot(self.G, Sigma)) # updated (a posteriori) covariance estimate
return x_post, Sigma_post
def filter(self, y, order="predict_filter"):
""" Performs sequential state update followed by measurement update.
Returns all {x_t}, {Sigma_t} given data {y_t}."""
assert (y.shape[0] == self.n_obs)
means = []
covs = []
xp, Sigmap = self.x, self.Sigma
for yi in y.T:
obs = np.atleast_2d(yi).reshape(self.n_obs, 1)
if order=="predict_filter":
xp, Sigmap = self.predict_next_state(xp, Sigmap)
xp, Sigmap = self.filter_state(obs, xp, Sigmap)
elif order=="filter_predict":
# This is experimental
xp, Sigmap = self.filter_state(obs, xp, Sigmap)
xp, Sigmap = self.predict_next_state(xp, Sigmap)
means.append(xp)
covs.append(Sigmap)
return np.array(means), np.array(covs)
def smooth_next(self, x, Sigma, next_x, next_Sigma):
""" Kalman smoother backwards step. Takes posterior moments (x, Sigma)
and (next_x, next_Sigma) to return smoothed moments.
params:
x: posterior mean E[x_j|y_1...y_j],
Sigma: posterior covariance Cov[x_j|y_1...y_j],
next_x: E[x_{j+1}|y_1...y_T]
next_Sigma: Cov[x_{j+1}|y_1...y_T]
return:
smoothed mean: E[x_j|y_1...y_T],
smoothed covariance: Cov[x_j|y_1...y_T]
"""
assert (x.shape == (self.n_states, 1))
assert (Sigma.shape == (self.n_states, self.n_states))
assert(next_x.shape == (self.n_states, 1))
assert(next_Sigma.shape == (self.n_states, self.n_states))
xp, Sigmap = self.predict_next_state(x, Sigma)
C = np.dot(Sigma, np.dot(self.A.T, np.linalg.pinv(Sigmap))) # Kalman smoothing gain
smooth_mean = x + np.dot(C, (next_x - xp))
smooth_covariance = Sigma + np.dot(C, np.dot((next_Sigma - Sigmap), C.T))
return smooth_mean, smooth_covariance
def smooth(self, y):
means, covs = self.filter(y)
n_data = y.shape[1]
xs = means[-1, :, :]
Sigmas = covs[-1, :, :]
smoothed_means, smoothed_covs = [xs], [Sigmas]
for j in range(n_data)[-2::-1]:
x = means[j, :, :]
Sigma = covs[j, :, :]
xs, Sigmas = self.smooth_next(x, Sigma, xs, Sigmas)
smoothed_means.append(xs)
smoothed_covs.append(Sigmas)
return np.array(smoothed_means)[::-1], np.array(smoothed_covs)[::-1]
def forecast(self, y, n_steps):
""" Forecasts or sample state/observation means and covariances n_steps time steps ahead."""
assert(n_steps > 0)
means, covs = self.filter(y)
xp, Sigmap = means[-1], covs[-1]
pred_state_means, pred_state_covs = [], []
pred_obs_means, pred_obs_covs = [], []
for _ in range(n_steps):
xp, Sigmap = self.predict_next_state(xp, Sigmap)
pred_state_means.append(xp)
pred_state_covs.append(Sigmap)
obs_mean, obs_cov = self.predict_observation(xp, Sigmap)
pred_obs_means.append(obs_mean)
pred_obs_covs.append(obs_cov)
return np.array(pred_state_means), np.array(pred_state_covs), \
np.array(pred_obs_means), np.array(pred_obs_covs)
def __str__(self):
if self.B is not None and self.u is not None:
Bu = f", B[{self.B.shape}], u[{self.u.shape}]"
else:
Bu = ""
name = f"{type(self).__name__}(x[{self.x.shape}], Sigma[{self.Sigma.shape}], "
name += f"A[{self.A.shape}], G[{self.G.shape}], Q[{self.Q.shape}], R[{self.R.shape}]{Bu})"
return name
class AdaptiveKalman(PlainKalmanFilter):
"""Simple Adaptive Kalman filter that adjust R (measurement noise covariance matrix)
based on rolling measurement data covariance. """
def __init__(self, x, Sigma, A, G, Q, R, B=None, u=None, window=10):
super().__init__(x, Sigma, A, G, Q, R, B, u)
self.window = window
self.Rold = self.R.copy()
def filter(self, y):
""" Performs sequential state update followed by measurement update.
Updates [R], on a rolling [window] interval.
Returns all {x_t}, {Sigma_t} given data {y_t}."""
assert (y.shape[0] == self.n_obs)
assert (y.shape[1] > self.window)
means = []
covs = []
xp, Sigmap = self.x, self.Sigma
for i, yi in enumerate(y.T):
obs = np.atleast_2d(yi).reshape(self.n_obs, 1)
xp, Sigmap = self.predict_next_state(xp, Sigmap)
xp, Sigmap = self.filter_state(obs, xp, Sigmap)
means.append(xp)
covs.append(Sigmap)
if i > self.window:
rolling_vars = np.std(y[:, (i-self.window):i], axis=1) ** 2
self.R = np.diag(rolling_vars).reshape(self.R.shape)
self.R = self.Rold
return np.array(means), np.array(covs)
State dynamics: $x_{t+1} = x_t$, therefore, $A=1$ , $Q=0$ and $x_0=\theta$
Measurement equation: $y_t = G\theta + v_t$, $ v_t \sim N(0,R)$, where $G=R=1$
Assume $\theta = 11$, $\hat{x}_0 = 6$, $\Sigma_0 = 7$
from scipy.stats import norm
theta = 11 # state x_t
A, Q, G, R = list(map(lambda x: np.matrix([x]), [1, 0, 1, 1]))
x_hat, Sigma = np.atleast_2d(6), np.atleast_2d(7)
N = 102
y = theta + np.random.randn(N)
fig, ax = plt.subplots(figsize=(10,8))
xgrid = np.linspace(theta - 3, theta + 3, 200)
pkf = PlainKalmanFilter(np.atleast_2d(x_hat), np.atleast_2d(Sigma), A, G, Q, R)
for i in range(N):
x_hat, Sigma = pkf.filter_state(np.atleast_2d(y[i]), x_hat, Sigma)
if i % 20 == 1:
ax.plot(xgrid, norm.pdf(xgrid, loc=x_hat, scale=np.sqrt(Sigma)).T, label=f'$t={i}$')
ax.set_title(f'First {N} densities when $\\theta = {theta:.1f}$')
ax.legend(loc='upper left')
plt.show()
We can observe how our prior distribution converges to expected mean after successive iterations.
Our state will be: $x_t = (x, \dot{x})^T$, and state evolution will be governed by:
$$x_{t} = A x_{t-1} + B_{t} a_{t},$$where $a_t \sim N(a, \sigma^2_a)$ is noisy acceleration, such that $w_t = B_{t}(a_{t}-a)$, and
$$ A = \begin{bmatrix} 1 & dt \\ 0 & 1 \end{bmatrix}, \ \ B = \begin{bmatrix} \frac{1}{2}dt^2 \\ dt \end{bmatrix}, \ \ Q = BB^T \sigma^2_a = \begin{bmatrix} \frac{1}{4}dt^4 & \frac{1}{2}dt^3 \\ \frac{1}{2}dt^3 & dt^2 \end{bmatrix} \sigma^2_a, \ \ $$The detector will measure position only, and our noisy measurement will be described by:
$$ y_t = G x_t + v_t,$$where $v_t \sim N(0, \sigma^2_x)$, and
$$ G = \begin{bmatrix} 1 & 0 \end{bmatrix}, \ \ R = \begin{bmatrix} \sigma^2_x \end{bmatrix}. $$The initial (prior) state mean and state covariance will be chosen to be:
$$ \hat{x}_{0|0} = \begin{bmatrix} 0.1 \\ 0.1 \end{bmatrix}, \ \ \Sigma_{0|0} = \begin{bmatrix} \sigma^2_x & 0 \\ 0 & \sigma^2_x \end{bmatrix}. $$dt = 0.1
a = 1.0
sigma_a = 1.0
sigma_x = 3.0
# transition matrix
A = np.array([[1, dt],
[0, 1]])
# transition covariance matrix
Q = np.array([[0.25 * dt ** 4, 0.5 * dt ** 3],
[0.5 * dt ** 3, dt ** 2]]) * sigma_a ** 2
G = np.array([[1.0, 0.0]])
R = np.array([[1.0]]) * sigma_x ** 2
# control matrix/vector
B = np.array([[0.5 * dt ** 2],
[dt]])
# control vector/scalar
u = a * np.array([[1.0]])
x_0 = np.array([[0.1],
[0.1]])
Sigma_0 = np.array([[sigma_x ** 2, 0.0],
[0.0, sigma_x ** 2]])
t_max = 10
N = int(t_max / dt)
x0 = 0.0
v0 = 1.0
t = np.linspace(0.0, t_max, N)
true_x = x0 + v0 * t + 0.5 * a * t ** 2
true_v = v0 + a * t
true = [true_x, true_v]
jumps = np.random.poisson(0.9, N)
noise = sigma_x * np.random.random(N)
observed_x = true_x + noise # + jumps
y = observed_x.reshape(1, N)
N.B. One can show that significant deviations from Gaussian distribution (e.g. when adding Poisson jumps) reduce the usefulness of discussed linear Kalman filter.
pkf = PlainKalmanFilter(x_0, Sigma_0, A, G, Q, R, B, u)
print(pkf)
xc, Sigmac = x_0, Sigma_0
means, covs = pkf.filter(y)
#############################################################################
smeans, scovs = pkf.smooth(y)
#############################################################################
n_steps = 50
fsmeans, fscovs, fomean, focovs = pkf.forecast(y, n_steps)
#############################################################################
akf = AdaptiveKalman(x_0, Sigma_0, A, G, Q, R, B, u, window=20)
admeans, adcovs = akf.filter(y)
adsmeans, adscovs = akf.smooth(y)
adfsmeans, adfscovs, adfomean, adfocovs = akf.forecast(y, n_steps)
def plot_kalman(means, covs, forecasted_means, forecasted_covs, true, num_sigmas, ax, title):
N = means.shape[0]
new_means = np.vstack([means, forecasted_means])
new_stds = np.sqrt(np.hstack([covs, forecasted_covs]))
new_t = np.arange(N + forecasted_means.shape[0])
state_var = new_means.ravel()
stds = new_stds.ravel()
ax.plot(new_t, state_var, 'b', linewidth=1.5, label="kalman")
ax.plot(new_t, state_var + num_sigmas * stds, 'k--', linewidth=0.8, label=f"{num_sigmas}-$\sigma$ band")
ax.plot(new_t, state_var - num_sigmas * stds, 'k--', linewidth=0.8)
ax.fill_between(new_t, state_var - num_sigmas * stds, state_var + 2 * stds, facecolor='green',
interpolate=True, alpha=0.5)
ax.plot(new_t[:N], true, 'r-', linewidth=1.5, label="true data")
ax.axvline(x=N, color='black', linewidth=0.8)
ax.set_title(title)
ax.legend(loc="best")
ax.fill_between(new_t, 1.1*min(state_var - num_sigmas * stds), 1.1*max(state_var + num_sigmas * stds),
where=new_t>=N, facecolor='yellow', alpha=0.1)
fig = plt.figure(figsize=(15,30))
j = 0
j1 = 1
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(means[:, j, :], covs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(means[:, j1, :], covs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="Velocity vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(smeans[:, j, :], scovs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="Position vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(smeans[:, j1, :], scovs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="Velocity vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(admeans[:, j, :], adcovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="Position vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(admeans[:, j1, :], adcovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="Velocity vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(adsmeans[:, j, :], adscovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="Position vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(adsmeans[:, j1, :], adscovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="Velocity vs Time: Adaptive Kalman Smoother")
plt.show()
from pykalman import KalmanFilter
kf = KalmanFilter(n_dim_obs=1, n_dim_state=2,
initial_state_mean=x_0.T.tolist()[0],
initial_state_covariance=Sigmac,
transition_matrices=A,
observation_matrices=G,
observation_covariance=R,
transition_covariance=Q,
transition_offsets=[0.5 * a * dt ** 2, a * dt]
)
filtered_state_means, filtered_state_covs = kf.filter(y)
smoothed_state_means, smoothed_state_covs = kf.smooth(y)
#############################################################################
kf_em = KalmanFilter(transition_matrices=A, observation_matrices=G)
kf_em = kf_em.em(y, n_iter=20)
filtered_state_means_em, _ = kf_em.filter(y)
smoothed_state_means_em, _ = kf_em.smooth(y)
future_states, _ = kf.sample(n_steps, initial_state=filtered_state_means[-1])
kf_em.em_vars
fig = plt.figure(figsize=(24,12))
j = 0
ax1 = fig.add_subplot(1, 2, 1)
plot_kalman(filtered_state_means[:, j].reshape(-1,1), filtered_state_covs[:, j, j], future_states[:, j].reshape(-1,1), fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: PyKalman Filtered")
ax2 = fig.add_subplot(1, 2, 2)
plot_kalman(means[:, j, :], covs[:, j, j], fsmeans[:,j,:], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax2, title="Position vs Time: Plain Kalman Filtered")
fig = plt.figure(figsize=plt.figaspect(0.3))
ax = fig.add_subplot(1, 2, 1)
ax.plot(t, means[:, 0], 'r-', linewidth=1.0, label="plain kalman filter")
ax.plot(t, admeans[:, 0], 'g--', linewidth=1.0, label="adaptive plain kalman filter")
ax.plot(t, smeans[:, 0], 'c--', linewidth=1.0, label="plain kalman smoother")
ax.plot(t, filtered_state_means[:, 0], 'b-', linewidth=1.0, label="pykalman filter")
# ax.plot(t, filtered_state_means_em[:,0], 'g-', linewidth=1.0, label="pykalman_em filter")
ax.plot(t, smoothed_state_means[:, 0], 'y-.', linewidth=0.5, label="pykalman smoother")
# ax.plot(t, smoothed_state_means_em[:,0], 'c-.', linewidth=0.5, label="pykalman_em smoother")
ax.plot(t, true_x, 'k--', linewidth=0.8, label="true data")
ax.legend(loc="best")
ax.set_title("Position: Kalman vs. true data")
ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(t, means[:, 1], 'r-', linewidth=1.0, label="plain kalman filter")
ax2.plot(t, admeans[:, 1], 'g--', linewidth=1.0, label="adaptive plain kalman filter")
ax2.plot(t, smeans[:, 1], 'c--', linewidth=1.0, label="plain kalman smoother")
ax2.plot(t, adsmeans[:, 1], 'b--', linewidth=1.0, label="adaptive plain kalman smoother")
ax2.plot(t, filtered_state_means[:, 1], 'b-', linewidth=1.0, label="pykalman filter")
# ax2.plot(t, filtered_state_means_em[:,1], 'g-', linewidth=1.0, label="pykalman_em filter")
ax2.plot(t, smoothed_state_means[:, 1], 'y-.', linewidth=0.5, label="pykalman smoother")
# ax2.plot(t, smoothed_state_means_em[:,1], 'c-.', linewidth=0.5, label="pykalman_em smoother")
ax2.plot(t, true_v, 'k--', linewidth=0.8, label="true data")
ax2.legend(loc="best")
ax2.set_title("Velocity: Kalman vs. true data")
plt.show()
fig = plt.figure(figsize=plt.figaspect(0.3))
ax = fig.add_subplot(1, 2, 1)
ax.plot(t, np.array(y).ravel() - true_x, 'rx', linewidth=1.0, label="observed")
ax.plot(t, np.array(means[:, 0]).ravel() - true_x, linewidth=1.0, label="plain kalman filter")
ax.plot(t, np.array(smeans[:, 0]).ravel() - true_x, linewidth=1.5, label="plain kalman smoother")
ax.plot(t, np.array(adsmeans[:, 0]).ravel() - true_x, linewidth=1.5, label="adaptive plain kalman smoother")
ax.plot(t, np.array(admeans[:, 0]).ravel() - true_x, linewidth=1.5, label="adaptive plain kalman filter")
ax.plot(t, filtered_state_means[:, 0] - true_x, linewidth=1.0, label="pykalman filter")
# ax.plot(t, filtered_state_means_em[:, 0] - true_x, 'g-', linewidth=1.0, label="pykalman_em filter")
ax.plot(t, smoothed_state_means[:, 0] - true_x, linewidth=2.5, label="pykalman smoother")
# ax.plot(t, smoothed_state_means_em[:, 0] - true_x, 'c-.', linewidth=0.5, label="pykalman_em smoother")
ax.legend(loc="best")
ax.set_title("Position: Kalman - true data")
ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(t, np.array(means[:, 1]).ravel() - true_v, linewidth=1.0, label="plain kalman filter")
ax2.plot(t, np.array(admeans[:, 1]).ravel() - true_v, linewidth=1.0, label="adaptive plain kalman filter")
ax2.plot(t, np.array(smeans[:, 1]).ravel() - true_v, linewidth=1.5, label="plain kalman smoother")
ax2.plot(t, np.array(adsmeans[:, 1]).ravel() - true_v, linewidth=1.5, label="adaptive plain kalman smoother")
ax2.plot(t, filtered_state_means[:, 1] - true_v, linewidth=1.0, label="pykalman filter")
# ax2.plot(t, filtered_state_means_em[:, 1] - true_v, 'g-', linewidth=1.0, label="pykalman_em filter")
ax2.plot(t, smoothed_state_means[:, 1] - true_v, linewidth=2.5, label="pykalman smoother")
# ax2.plot(t, smoothed_state_means_em[:, 1] - true_v, 'c-.', linewidth=0.5, label="pykalman_em smoother")
ax2.legend(loc="best")
ax2.set_title("Velocity: Kalman - true data")
plt.show()
We will do forecasting, to find the next observation by continuously updating Kalman filter parameters on a given window.
window = 30
pos = []
vel = []
pkf = PlainKalmanFilter(x_0, Sigma_0, A, G, Q, R, B, u)
print(pkf)
for i in range(window, N):
obs = y.T[:i]#[-window:]
next_mean, next_cov, _, _ = pkf.forecast(obs.T, 1)
pos.append(next_mean[:,0])
vel.append(next_mean[:,1])
pos = np.array(pos)[:,0,0]
vel = np.array(vel)[:,0,0]
fig = plt.figure(figsize=plt.figaspect(0.3))
ax = fig.add_subplot(1, 1, 1)
ax.plot(t[window:], true_x[window:], 'k--', linewidth=0.8, label="true data")
ax.plot(t[window:], pos, 'g-', linewidth=0.8, label="sequential Kalman")
ax.plot(t[window:], filtered_state_means[window:,0], 'r-', linewidth=1.2, label="pykalman filter")
ax.plot(t[window:], filtered_state_means_em[window:,0], 'y-', linewidth=1.2, label="pykalman_em filter")
ax.plot(t[window:], np.array(means[:,0]).ravel()[window:], 'b-', linewidth=1.0, label="plain kalman filter")
ax.legend(loc="upper left")
ax.set_title("Position: Kalmans vs. true data")
plt.show()
Comparisons essentially show that there is no significant difference between discussed filters (same for smoothers).
Our state will be: $x_t = (x_1, \dot{x}_1, \ddot{x}_1, x_2, \dot{x}_2, \ddot{x}_2)^T$, the transition, transition covariance and observation matrices will be:
$$ A = \begin{bmatrix} 1 & dt & dt^2/2 & 0 & 0 & 0 \\ 0 & 1 & dt & 0 & 0 & 0 \\ 0 & 0 & 1 & 0 & 0 & 0 \\ 0 & 0 & 0 & 1 & dt & dt^2/2\\ 0 & 0 & 0 & 0 & 1 & dt \\ 0 & 0 & 0 & 0 & 0 & 1 \end{bmatrix} , $$$$ Q = \begin{bmatrix} \frac{1}{4}dt^4 & \frac{1}{2}dt^3 & \frac{1}{2}dt^2 & 0 & 0 & 0 \\ \frac{1}{2}dt^3 & dt^2 & dt & 0 & 0 & 0 \\ \frac{1}{2}dt^2 & dt & 1 & 0 & 0 & 0 \\ 0 & 0 & 0 & \frac{1}{4}dt^4 & \frac{1}{2}dt^3 & \frac{1}{2}dt^2 \\ 0 & 0 & 0 & \frac{1}{2}dt^3 & dt^2 & dt \\ 0 & 0 & 0 & \frac{1}{2}dt^2 & dt & 1 \\ \end{bmatrix} \sigma^2_a, $$ $$ G = \begin{bmatrix} 1 & 0 & 0 & 0 & 0 & 0\\ 0 & 0 & 0 & 1 & 0 & 0 \end{bmatrix}. $$sigma_a = 0.9
sigma_x = 3.5
# transition matrix
A = np.array([[1, dt, 0.5 * dt ** 2, 0, 0, 0],
[0, 1, dt, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, dt, 0.5 * dt ** 2],
[0, 0, 0, 0, 1, dt],
[0, 0, 0, 0, 0, 1]])
# transition covariance matrix
Q = np.array([[0.25 * dt ** 4, 0.5 * dt ** 3, 0.5 * dt ** 2, 0, 0, 0],
[0.5 * dt ** 3, dt ** 2, dt, 0, 0, 0],
[0.5 * dt ** 2, dt, 1, 0, 0, 0],
[0, 0, 0, 0.25 * dt ** 4, 0.5 * dt ** 3, 0.5 * dt ** 2],
[0, 0, 0, 0.5 * dt ** 3, dt ** 2, dt],
[0, 0, 0, 0.5 * dt ** 2, dt, 1]]) * sigma_a ** 2
G = np.array([[1, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0]])
R = np.array([[1, 0],
[0, 1]]) * sigma_x ** 2
B = None
u = None
x_0 = np.array([0.1, 0.1, 0.1, 0, 0, 8]).reshape(6, 1)
Sigma_0 = (sigma_x ** 2) * np.eye(6)
n_iter = 10
dt = 0.1
t_max = 10
N = int(t_max/dt)
x10, x20 = 0.0, 0.0
v10, v20 = 0.0, 0.0
a1, a2 = 1.0, 10.0
t = np.linspace(0.0, t_max, N)
true_x1 = x10 + v10 * t + 0.5 * a1 * t ** 2
true_x2 = x20 + v20 * t + 0.5 * a2 * t ** 2
true_v1 = v10 + a1 * t
true_v2 = v20 + a2 * t
true = [true_x1, true_v1, a1*np.ones(N), true_x2, true_v2, a2*np.ones(N)]
observed_x1 = true_x1 + sigma_x * np.random.random(N) #+ np.random.poisson(2.0, N)
observed_x2 = true_x2 + sigma_x * np.random.random(N) #+ np.random.poisson(2.0, N)
y = np.vstack((observed_x1, observed_x2)).reshape(2, N)
pkf = PlainKalmanFilter(x_0, Sigma_0, A, G, Q, R, B, u)
print(pkf)
xc, Sigmac = x_0, Sigma_0
means, covs = pkf.filter(y)
#############################################################################
smeans, scovs = pkf.smooth(y)
#############################################################################
n_steps = 50
fsmeans, fscovs, fomean, focovs = pkf.forecast(y, n_steps)
#############################################################################
akf = AdaptiveKalman(x_0, Sigma_0, A, G, Q, R, B, u, window=20)
admeans, adcovs = akf.filter(y)
adsmeans, adscovs = akf.smooth(y)
adfsmeans, adfscovs, adfomean, adfocovs = akf.forecast(y, n_steps)
fig = plt.figure(figsize=(15,30))
j = 0
j1 = 1
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(means[:, j, :], covs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(means[:, j1, :], covs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="Velocity vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(smeans[:, j, :], scovs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="Position vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(smeans[:, j1, :], scovs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="Velocity vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(admeans[:, j, :], adcovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="Position vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(admeans[:, j1, :], adcovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="Velocity vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(adsmeans[:, j, :], adscovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="Position vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(adsmeans[:, j1, :], adscovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="Velocity vs Time: Adaptive Kalman Smoother")
plt.show()
fig = plt.figure(figsize=(15,30))
j = 3
j1 = 4
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(means[:, j, :], covs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(means[:, j1, :], covs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="Velocity vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(smeans[:, j, :], scovs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="Position vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(smeans[:, j1, :], scovs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="Velocity vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(admeans[:, j, :], adcovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="Position vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(admeans[:, j1, :], adcovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="Velocity vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(adsmeans[:, j, :], adscovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="Position vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(adsmeans[:, j1, :], adscovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="Velocity vs Time: Adaptive Kalman Smoother")
plt.show()
fig = plt.figure(figsize=(15,30))
j = 2
j1 = 5
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(means[:, j, :], covs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="$a_1$ vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(means[:, j1, :], covs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="$a_2$ vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(smeans[:, j, :], scovs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="$a_1$ vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(smeans[:, j1, :], scovs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="$a_2$ vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(admeans[:, j, :], adcovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="$a_1$ vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(admeans[:, j1, :], adcovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="$a_2$ vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(adsmeans[:, j, :], adscovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="$a_1$ vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(adsmeans[:, j1, :], adscovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="$a_2$ vs Time: Adaptive Kalman Smoother")
plt.show()
kf = KalmanFilter(initial_state_mean=x_0.T.tolist()[0],
initial_state_covariance=Sigma_0,
transition_matrices=A,
transition_covariance=Q,
observation_matrices=G,
observation_covariance=R)
# kf = KalmanFilter(transition_matrices=A, observation_matrices=G)
# kf = kf.em(measurements, n_iter=n_iter)
(filtered_state_means, filtered_state_covariances) = kf.filter(y.T.tolist())
(smoothed_state_means, smoothed_state_covariances) = kf.smooth(y.T.tolist())
fig = plt.figure(figsize=plt.figaspect(0.3))
ax = fig.add_subplot(1, 2, 1)
ax.plot(t, filtered_state_means[:,0]-true_x1, 'r-', linewidth=1.2, label="pykalman filter")
ax.plot(t, smoothed_state_means[:,0]-true_x1, 'g-', linewidth=1.2, label="pykalman smoother")
ax.plot(t, np.array(means[:, 0]).ravel() - true_x1, 'k--', linewidth=1.0, label="plain kalman filter")
ax.plot(t, np.array(smeans[:, 0]).ravel() - true_x1, 'c-', linewidth=1.5, label="plain kalman smoother")
#ax.plot(t, true_x1, 'k--', linewidth=0.8, label="true data")
ax.plot(t, observed_x1-true_x1, 'b-.', linewidth=0.8, label="observed data")
ax.legend(loc="upper left")
ax.set_title("Position-x: Kalmans - true data")
ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(t, filtered_state_means[:,1]-true_v1, 'r-', linewidth=1.2, label="pykalman filter")
ax2.plot(t, smoothed_state_means[:,1]-true_v1, 'g-', linewidth=1.2, label="pykalman smoother")
ax2.plot(t, np.array(means[:, 1]).ravel() - true_v1, 'k--', linewidth=1.0, label="plain kalman filter")
ax2.plot(t, np.array(smeans[:, 1]).ravel() - true_v1, 'c-', linewidth=1.5, label="plain kalman smoother")
ax2.legend(loc="upper left")
ax2.set_title("Velocity-x: Kalmans - true data")
plt.show()
fig = plt.figure(figsize=plt.figaspect(0.3))
ax = fig.add_subplot(1, 2, 1)
ax.plot(t, filtered_state_means[:,3]-true_x2, 'r-', linewidth=1.2, label="pykalman filter")
ax.plot(t, smoothed_state_means[:,3]-true_x2, 'g-', linewidth=1.2, label="pykalman smoother")
ax.plot(t, observed_x2-true_x2, 'b-.', linewidth=0.8, label="observed data")
ax.legend(loc="upper left")
ax.set_title("Position-y: Kalmans - true data")
ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(t, filtered_state_means[:,4]-true_v2, 'r-', linewidth=1.2, label="pykalman filter")
ax2.plot(t, smoothed_state_means[:,4]-true_v2, 'g-', linewidth=1.2, label="pykalman smoother")
ax2.legend(loc="upper left")
ax2.set_title("Velocity-y: Kalmans - true data")
plt.show()
error1 = [(means[:,i,0] - true[i]).std() for i in range(6)]
error2 = [(smeans[:,i,0] - true[i]).std() for i in range(6)]
error3 = [(admeans[:,i,0] - true[i]).std() for i in range(6)]
error4 = [(adsmeans[:,i,0] - true[i]).std() for i in range(6)]
error5 = [(filtered_state_means[:,i] - true[i]).std() for i in range(6)]
error6 = [(smoothed_state_means[:,i] - true[i]).std() for i in range(6)]
[sum(e) for e in [error1, error2, error3, error4, error5, error6]]
Assumption of update rates:
1.Accelerometer collects data (from IMU) at 10 Hz
2.GPS collects data (lon/lat/alt) with at 1 Hz
We will measure $(x_1, x_2, a_1, a_2)$
dt = 0.1
sigma_x = 10.0
sigma_a = 1.0
# transition matrix
A = np.matrix([[1, dt, 0.5 * dt ** 2, 0, 0, 0],
[0, 1, dt, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, dt, 0.5 * dt ** 2],
[0, 0, 0, 0, 1, dt],
[0, 0, 0, 0, 0, 1]])
# transition covariance matrix
Q = np.matrix([[0.25 * dt ** 4, 0.5 * dt ** 3, 0.5 * dt ** 2, 0, 0, 0],
[0.5 * dt ** 3, dt ** 2, dt, 0, 0, 0],
[0.5 * dt ** 2, dt, 1, 0, 0, 0],
[0, 0, 0, 0.25 * dt ** 4, 0.5 * dt ** 3, 0.5 * dt ** 2],
[0, 0, 0, 0.5 * dt ** 3, dt ** 2, dt],
[0, 0, 0, 0.5 * dt ** 2, dt, 1]]) * sigma_a ** 2
G = np.matrix([[1, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 1]])
R = np.matrix([[sigma_x**2, 0.0, 0.0, 0.0],
[0.0, sigma_x**2, 0.0, 0.0],
[0.0, 0.0, sigma_a**2, 0.0],
[0.0, 0.0, 0.0, sigma_a**2]])
B = None
u = None
x_0 = np.matrix([0.0, 1.0, 0.1, 0.0, 0.0, -1.0]).T
Sigma_0 = np.eye(6) * sigma_x ** 2
t_max = 10
N = int(t_max/dt)
x10 = 0
x20 = 0
v10 = 10.5
v20 = 1.5
a1 = 1.1
a2 = -1.1
t = np.linspace(0.0, t_max, N)
true_x1 = x10 + v10 * t + 0.5 * a1 * t ** 2
true_x2 = x20 + v20 * t + 0.5 * a2 * t ** 2
true_v1 = v10 + a1 * t
true_v2 = v20 + a2 * t
obs_x1 = true_x1 + sigma_x * np.random.randn(N)
obs_x2 = true_x2 + sigma_x * np.random.randn(N)
obs_a1 = a1 + sigma_a * np.random.randn(N)
obs_a2 = a2 + sigma_a * np.random.randn(N)
true = [true_x1, true_v1, a1*np.ones(N), true_x2, true_v2, a2*np.ones(N)]
# Generate GPS vector t-th entry of which is true if there is GPS data at time t, and false othewise
GPS = np.ndarray(N, dtype='bool')
GPS[0] = True
# Less new position updates
for i in range(1, N):
if i % 10 == 0:
GPS[i]=True
else:
obs_x1[i] = obs_x1[i-1]
obs_x2[i] = obs_x2[i-1]
GPS[i]=False
y = np.vstack((obs_x1, obs_x2, obs_a1, obs_a2))
fig = plt.figure(figsize=(16,9))
plt.subplot(211)
plt.step(t, obs_x1, label='$x_1$')
plt.step(t, obs_x2, label='$x_2$')
plt.ylabel(r'Position $(meters)$')
plt.title('x-measurements (low frequency)')
#plt.ylim([-10, 10])
plt.legend(loc='best',prop={'size':18})
plt.subplot(212)
plt.step(t, obs_a1, label='$a_1$')
plt.step(t, obs_a2, label='$a_2$')
plt.ylabel(r'Acceleration $(meters/sec^2)$')
plt.xlabel('time')
#plt.ylim([-1, 1])
plt.title('a-measurements (high frequency)')
plt.legend(loc='best',prop={'size':18})
plt.show()
pkf = PlainKalmanFilter(x_0, Sigma_0, A, G, Q, R, B, u)
print(pkf)
xc, Sigmac = x_0, Sigma_0
means, covs = pkf.filter(y)
#############################################################################
smeans, scovs = pkf.smooth(y)
#############################################################################
n_steps = 15
fsmeans, fscovs, fomean, focovs = pkf.forecast(y, n_steps)
#############################################################################
akf = AdaptiveKalman(x_0, Sigma_0, A, G, Q, R, B, u, window=20)
admeans, adcovs = akf.filter(y)
adsmeans, adscovs = akf.smooth(y)
adfsmeans, adfscovs, adfomean, adfocovs = akf.forecast(y, n_steps)
fig = plt.figure(figsize=(15,30))
j = 0
j1 = 1
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(means[:, j, :], covs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(means[:, j1, :], covs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="Velocity vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(smeans[:, j, :], scovs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="Position vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(smeans[:, j1, :], scovs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="Velocity vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(admeans[:, j, :], adcovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="Position vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(admeans[:, j1, :], adcovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="Velocity vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(adsmeans[:, j, :], adscovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="Position vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(adsmeans[:, j1, :], adscovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="Velocity vs Time: Adaptive Kalman Smoother")
plt.show()
fig = plt.figure(figsize=(15,30))
j = 3
j1 = 4
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(means[:, j, :], covs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(means[:, j1, :], covs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="Velocity vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(smeans[:, j, :], scovs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="Position vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(smeans[:, j1, :], scovs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="Velocity vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(admeans[:, j, :], adcovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="Position vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(admeans[:, j1, :], adcovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="Velocity vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(adsmeans[:, j, :], adscovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="Position vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(adsmeans[:, j1, :], adscovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="Velocity vs Time: Adaptive Kalman Smoother")
plt.show()
fig = plt.figure(figsize=(15,30))
j = 2
j1 = 5
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(means[:, j, :], covs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="$a_1$ vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(means[:, j1, :], covs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="$a_2$ vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(smeans[:, j, :], scovs[:, j, j], fsmeans[:, j, :], fscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="$a_1$ vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(smeans[:, j1, :], scovs[:, j1, j1], fsmeans[:, j1, :], fscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="$a_2$ vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(admeans[:, j, :], adcovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="$a_1$ vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(admeans[:, j1, :], adcovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="$a_2$ vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(adsmeans[:, j, :], adscovs[:, j, j], adfsmeans[:, j, :], adfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="$a_1$ vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(adsmeans[:, j1, :], adscovs[:, j1, j1], adfsmeans[:, j1, :], adfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="$a_2$ vs Time: Adaptive Kalman Smoother")
plt.show()
class AsyncKalman(PlainKalmanFilter):
"""Simple Asynchronous Kalman filter that allows to treat cases
when observables arrive asynchronously."""
def __init__(self, x, Sigma, A, G, Q, R, GPS, B=None, u=None):
super().__init__(x, Sigma, A, G, Q, R, B, u)
self.GPS = GPS
def filter(self, y):
""" Performs sequential state update followed by measurement update
if GPS signal/data is available.
GPS is a boolean array of same length as y.
Returns all {x_t}, {Sigma_t} given data {y_t}."""
assert (y.shape[0] == self.n_obs)
assert (y.shape[1] == self.GPS.shape[0])
means = []
covs = []
xp, Sigmap = self.x, self.Sigma
for i, yi in enumerate(y.T):
obs = np.atleast_2d(yi).reshape(self.n_obs, 1)
xp, Sigmap = self.predict_next_state(xp, Sigmap)
if self.GPS[i]:
xp, Sigmap = self.filter_state(obs, xp, Sigmap)
means.append(xp)
covs.append(Sigmap)
return np.array(means), np.array(covs)
class AsyncAdaptiveKalman(AsyncKalman):
"""Simple Adaptive Kalman adjusted to Asynchronous Kalman filter. """
def __init__(self, x, Sigma, A, G, Q, R, GPS, B=None, u=None, window=10):
super().__init__(x, Sigma, A, G, Q, R, GPS, B, u)
self.window = window
self.Rold = self.R.copy()
def filter(self, y):
""" Performs sequential state update followed by measurement update.
Updates [R], on a rolling [window] interval.
Returns all {x_t}, {Sigma_t} given data {y_t}."""
assert (y.shape[0] == self.n_obs)
assert (y.shape[1] > self.window)
means = []
covs = []
xp, Sigmap = self.x, self.Sigma
for i, yi in enumerate(y.T):
obs = np.atleast_2d(yi).reshape(self.n_obs, 1)
xp, Sigmap = self.predict_next_state(xp, Sigmap)
if self.GPS[i]:
xp, Sigmap = self.filter_state(obs, xp, Sigmap)
means.append(xp)
covs.append(Sigmap)
if i > self.window:
rolling_vars = np.std(y[:, (i-self.window):i], axis=1) ** 2
self.R = np.diag(rolling_vars).reshape(self.R.shape)
self.R = self.Rold
return np.array(means), np.array(covs)
apkf = AsyncKalman(x_0, Sigma_0, A, G, Q, R, GPS, B, u)
print(apkf)
xc, Sigmac = x_0, Sigma_0
ameans, acovs = apkf.filter(y)
#############################################################################
asmeans, ascovs = apkf.smooth(y)
#############################################################################
n_steps = 20
afsmeans, afscovs, afomean, afocovs = apkf.forecast(y, n_steps)
#############################################################################
aakf = AsyncAdaptiveKalman(x_0, Sigma_0, A, G, Q, R, GPS, B, u, window=20)
aadmeans, aadcovs = aakf.filter(y)
aadsmeans, aadscovs = aakf.smooth(y)
aadfsmeans, aadfscovs, aadfomean, aadfocovs = aakf.forecast(y, n_steps)
fig = plt.figure(figsize=(15,30))
j = 0
j1 = 1
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(ameans[:, j, :], acovs[:, j, j], afsmeans[:, j, :], afscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(ameans[:, j1, :], acovs[:, j1, j1], afsmeans[:, j1, :], afscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="Velocity vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(asmeans[:, j, :], ascovs[:, j, j], afsmeans[:, j, :], afscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="Position vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(asmeans[:, j1, :], ascovs[:, j1, j1], afsmeans[:, j1, :], afscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="Velocity vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(aadmeans[:, j, :], aadcovs[:, j, j], aadfsmeans[:, j, :], aadfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="Position vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(aadmeans[:, j1, :], aadcovs[:, j1, j1], aadfsmeans[:, j1, :], aadfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="Velocity vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(aadsmeans[:, j, :], aadscovs[:, j, j], aadfsmeans[:, j, :], aadfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="Position vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(aadsmeans[:, j1, :], aadscovs[:, j1, j1], aadfsmeans[:, j1, :], aadfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="Velocity vs Time: Adaptive Kalman Smoother")
plt.show()
fig = plt.figure(figsize=(15,30))
j = 3
j1 = 4
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(ameans[:, j, :], acovs[:, j, j], afsmeans[:, j, :], afscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(ameans[:, j1, :], acovs[:, j1, j1], afsmeans[:, j1, :], afscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="Velocity vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(asmeans[:, j, :], ascovs[:, j, j], afsmeans[:, j, :], afscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="Position vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(asmeans[:, j1, :], ascovs[:, j1, j1], afsmeans[:, j1, :], afscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="Velocity vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(aadmeans[:, j, :], aadcovs[:, j, j], aadfsmeans[:, j, :], aadfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="Position vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(aadmeans[:, j1, :], aadcovs[:, j1, j1], aadfsmeans[:, j1, :], aadfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="Velocity vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(aadsmeans[:, j, :], aadscovs[:, j, j], aadfsmeans[:, j, :], aadfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="Position vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(aadsmeans[:, j1, :], aadscovs[:, j1, j1], aadfsmeans[:, j1, :], aadfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="Velocity vs Time: Adaptive Kalman Smoother")
plt.show()
fig = plt.figure(figsize=(15,30))
j = 2
j1 = 5
ax1 = fig.add_subplot(8, 2, 1)
plot_kalman(ameans[:, j, :], acovs[:, j, j], afsmeans[:, j, :], afscovs[:, j, j], true[j], num_sigmas=2,
ax=ax1, title="Position vs Time: Filtered")
ax2 = fig.add_subplot(8, 2, 2)
plot_kalman(ameans[:, j1, :], acovs[:, j1, j1], afsmeans[:, j1, :], afscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax2, title="Velocity vs Time: Filtered")
ax3 = fig.add_subplot(8, 2, 3)
plot_kalman(asmeans[:, j, :], ascovs[:, j, j], afsmeans[:, j, :], afscovs[:, j, j], true[j], num_sigmas=2,
ax=ax3, title="Position vs Time: Smoothed")
ax4 = fig.add_subplot(8, 2, 4)
plot_kalman(asmeans[:, j1, :], ascovs[:, j1, j1], afsmeans[:, j1, :], afscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax4, title="Velocity vs Time: Smoothed")
ax5 = fig.add_subplot(8, 2, 5)
plot_kalman(aadmeans[:, j, :], aadcovs[:, j, j], aadfsmeans[:, j, :], aadfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax5, title="Position vs Time: Adaptive Kalman Filter")
ax6 = fig.add_subplot(8, 2, 6)
plot_kalman(aadmeans[:, j1, :], aadcovs[:, j1, j1], aadfsmeans[:, j1, :], aadfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax6, title="Velocity vs Time: Adaptive Kalman Filter")
ax7 = fig.add_subplot(8, 2, 7)
plot_kalman(aadsmeans[:, j, :], aadscovs[:, j, j], aadfsmeans[:, j, :], aadfscovs[:, j, j], true[j], num_sigmas=2,
ax=ax7, title="Position vs Time: Adaptive Kalman Smoother")
ax8 = fig.add_subplot(8, 2, 8)
plot_kalman(aadsmeans[:, j1, :], aadscovs[:, j1, j1], aadfsmeans[:, j1, :], aadfscovs[:, j1, j1], true[j1], num_sigmas=2,
ax=ax8, title="Velocity vs Time: Adaptive Kalman Smoother")
plt.show()
class KFKalmanGain(PlainKalmanFilter):
""" In addition, calculates Kalman gain."""
def __init__(self, x, Sigma, A, G, Q, R, B=None, u=None):
super().__init__(x, Sigma, A, G, Q, R, B, u)
def filter_state(self, y, x, Sigma, calc_kg=False):
""" Updates/filters (x, Sigma) to posterior moments based on a current single measurement y_t."""
assert (y.shape == (self.n_obs, 1))
assert (x.shape == (self.n_states, 1))
assert (Sigma.shape == (self.n_states, self.n_states))
z = y - np.dot(self.G, x) # innovation or observation residual
S = np.dot(self.G, np.dot(Sigma, self.G.T)) + self.R # innovation or residual covariance
SI = np.linalg.pinv(S)
K = np.dot(Sigma, np.dot(self.G.T, SI)) # optimal Kalman gain
x_post = x + np.dot(K, z) # updated (a posteriori) state estimate
Sigma_post = Sigma - np.dot(K, np.dot(self.G, Sigma)) # updated (a posteriori) covariance estimate
if calc_kg:
return x_post, Sigma_post, K
return x_post, Sigma_post
def filter(self, y, calc_kg=False):
""" Performs sequential state update followed by measurement update.
Returns all {x_t}, {Sigma_t} given data {y_t}."""
assert (y.shape[0] == self.n_obs)
means = []
covs = []
kgain = []
xp, Sigmap = self.x, self.Sigma
for i, yi in enumerate(y.T):
obs = np.atleast_2d(yi).reshape(self.n_obs, 1)
xp, Sigmap = self.predict_next_state(xp, Sigmap)
if calc_kg:
xp, Sigmap, K = self.filter_state(obs, xp, Sigmap, calc_kg=True)
kgain.append(K)
xp, Sigmap = self.filter_state(obs, xp, Sigmap)
means.append(xp)
covs.append(Sigmap)
if calc_kg:
return np.array(means), np.array(covs), np.array(kgain)
return np.array(means), np.array(covs)
class AsyncKFKalmanGain(PlainKalmanFilter):
""" In addition, calculates Kalman gain."""
def __init__(self, x, Sigma, A, G, Q, R, GPS, B=None, u=None):
super().__init__(x, Sigma, A, G, Q, R, B, u)
self.GPS = GPS
def filter_state(self, y, x, Sigma, calc_kg=False):
""" Updates/filters (x, Sigma) to posterior moments based on a current single measurement y_t."""
assert (y.shape == (self.n_obs, 1))
assert (x.shape == (self.n_states, 1))
assert (Sigma.shape == (self.n_states, self.n_states))
z = y - np.dot(self.G, x) # innovation or observation residual
S = np.dot(self.G, np.dot(Sigma, self.G.T)) + self.R # innovation or residual covariance
SI = np.linalg.pinv(S)
K = np.dot(Sigma, np.dot(self.G.T, SI)) # optimal Kalman gain
x_post = x + np.dot(K, z) # updated (a posteriori) state estimate
Sigma_post = Sigma - np.dot(K, np.dot(self.G, Sigma)) # updated (a posteriori) covariance estimate
if calc_kg:
return x_post, Sigma_post, K
return x_post, Sigma_post
def filter(self, y, calc_kg=False):
""" Performs sequential state update followed by measurement update
if GPS signal/data is available.
GPS is a boolean array of same length as y.
Returns all {x_t}, {Sigma_t} given data {y_t}."""
assert (y.shape[0] == self.n_obs)
assert (y.shape[1] == self.GPS.shape[0])
means = []
covs = []
kgain = []
xp, Sigmap = self.x, self.Sigma
for i, yi in enumerate(y.T):
obs = np.atleast_2d(yi).reshape(self.n_obs, 1)
xp, Sigmap = self.predict_next_state(xp, Sigmap)
if self.GPS[i]:
if calc_kg:
xp, Sigmap, K = self.filter_state(obs, xp, Sigmap, calc_kg=True)
xp, Sigmap = self.filter_state(obs, xp, Sigmap)
means.append(xp)
covs.append(Sigmap)
if calc_kg:
kgain.append(K)
if calc_kg:
return np.array(means), np.array(covs), np.array(kgain)
return np.array(means), np.array(covs)
kf = KalmanFilter(initial_state_mean=x_0.T.tolist()[0],
initial_state_covariance=Sigma_0,
transition_matrices=A,
transition_covariance=Q,
observation_matrices=G,
observation_covariance=R)
# kf = KalmanFilter(transition_matrices=A, observation_matrices=G)
# kf = kf.em(measurements, n_iter=n_iter)
(filtered_state_means, filtered_state_covariances) = kf.filter(y.T.tolist())
(smoothed_state_means, smoothed_state_covariances) = kf.smooth(y.T.tolist())
pkf = KFKalmanGain(x_0, Sigma_0, A, G, Q, R, B, u)
print(pkf)
xc, Sigmac = x_0, Sigma_0
means, covs, kgain = pkf.filter(y, calc_kg=True)
apkf = AsyncKFKalmanGain(x_0, Sigma_0, A, G, Q, R, GPS, B, u)
print(apkf)
xc, Sigmac = x_0, Sigma_0
ameans, acovs, akgain = apkf.filter(y, calc_kg=True)
plt.plot(kgain[:, 0, 0])
plt.plot(akgain[:, 0, 0])
plt.plot(kgain[:, 3, 1])
plt.plot(akgain[:, 3, 1])
error1 = [(means[:,i,0] - true[i]).std() for i in range(6)]
error2 = [(ameans[:,i,0] - true[i]).std() for i in range(6)]
error3 = [(filtered_state_means[:,i] - true[i]).std() for i in range(6)]
error1, error2, error3
Another interesting application of extended Kalman filter on GPS data can be found here
import gpxpy
import pandas as pd
import mplleaflet
# from pykalman import KalmanFilter
with open('../data/gps_track.gpx', 'r') as fh:
gpx_file = gpxpy.parse(fh)
segment = gpx_file.tracks[1].segments[0]
coords = pd.DataFrame([
{'lat': p.latitude,
'lon': p.longitude,
'ele': p.elevation,
'time': p.time} for p in segment.points])
coords.set_index('time', drop=True, inplace=True)
coords.head()
rate = 2
fig = plt.figure()
plt.rcParams['figure.figsize'] = (14,12)
plt.plot(coords['lon'].values[::rate], coords['lat'].values[::rate], 'r-o', linewidth=0.5);
mplleaflet.display(fig=fig)
plt.rcParams['figure.figsize'] = (8,4)
plt.plot(coords['ele'].values);
segment.points[0].speed, segment.points[-1].speed = 0.0, 0.0
gpx_file.add_missing_speeds()
speed = np.array([p.speed for p in segment.points]) * 3.6
plt.plot(speed);
segment.get_uphill_downhill()
Let's choose our state to be: $x = (x_1, x_2, x_3, v_1, v_2, v_3)$, and dynamics to be governed by non accelerating motion:
We will measure $(x_1, x_2, x_3)$, and our measurement equation would be:
dt = 1 # sampling rate
x_0 = np.zeros((6, 1))
A = np.eye(6)
A[:3,3:] = dt * np.eye(3)
G = np.zeros((3, 6))
G[:3,:3] = np.eye(3)
R = np.diag([1e-4, 1e-4, 100]) ** 2 # approximate GPS errors
# Make time samples equidistant and equal to 1 sec
coords.index = np.round(coords.index.astype(np.int64), -9).astype('datetime64[ns]')
# We fix signal loss by resampling
coords = coords.resample('1S').asfreq()
coords.loc[coords.ele.isnull()].head()
# missing values should be masked for Kalman filter
measurements = np.ma.masked_invalid(coords[['lon', 'lat', 'ele']].values)
initial_state_mean = np.hstack([measurements[0, :], 3*[0.0]])
initial_state_covariance = np.diag([1e-4, 1e-4, 50, 1e-6, 1e-6, 1e-6]) ** 2
plt.plot(measurements[:,0], measurements[:,1])
filled_coords = coords.fillna(method='pad').ix[coords.ele.isnull()]
plt.plot(filled_coords['lon'].values, filled_coords['lat'].values, 'r.');
kf = KalmanFilter(transition_matrices=A,
observation_matrices=G,
observation_covariance=R,
initial_state_mean=initial_state_mean,
initial_state_covariance=initial_state_covariance,
em_vars=['transition_covariance'])
kf = kf.em(measurements, n_iter=100)
state_means, state_vars = kf.smooth(measurements)
plt.plot(filled_coords['ele'].values, 'k--')
plt.plot(state_means[:,2], 'r-')
plt.show()
After getting the smoothed data
coords.shape, state_means.shape
coords.ix[:, ['lon', 'lat', 'ele']] = state_means[:,:3]
coords = coords.reset_index()
orig_coords = coords.ix[~coords.index.isnull()]#.set_index('idx')
for i, p in enumerate(segment.points):
p.speed = None
p.elevation = orig_coords.at[i, 'ele']
p.longitude = orig_coords.at[i, 'lon']
p.latitude = orig_coords.at[i, 'lat']
segment.get_uphill_downhill()
import pandas as pd
from bokeh.plotting import figure, show, output_notebook
from nsepy import get_history
from datetime import date, datetime
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
def plot_roc_curve(fpr, tpr, auc):
plt.plot(fpr, tpr, color='orange', label='ROC AUC: %.2f' % auc)
plt.plot([0, 1], [0, 1], color='darkblue', linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend()
plt.show()
df = get_history(symbol='TCS', start = date(2019,1,1), end = date(2019,5,25))
returns = 100 * df[['Close']].pct_change().dropna().values
n = returns.shape[0]
n_train = int(0.75*n)
r_train, r_test = returns[:n_train], returns[n_train:]
r = returns
# arguments are: x_0, Sigma_0, A, G, Q, R
pkf = PlainKalmanFilter(r[0,0].reshape(-1,1), np.atleast_2d([0.1]), np.atleast_2d([1]),
np.atleast_2d([1]), np.atleast_2d([0.1]), np.atleast_2d([1]))
y = r.reshape(1, -1)
means, covs = pkf.filter(y)
fpr, tpr, thresholds = roc_curve(np.sign(means.ravel()), np.sign(r.ravel()))
auc = roc_auc_score(np.sign(means.ravel()), np.sign(r.ravel()))
plot_roc_curve(fpr, tpr, auc)
kf = KalmanFilter(transition_matrices = [1],
observation_matrices = [1],
initial_state_mean = r[0],
initial_state_covariance = 0.1,
observation_covariance=0.1,
transition_covariance=.01)
# kf = KalmanFilter(initial_state_mean = r[0], transition_matrices = [1], observation_matrices = [1])
# kf = kf.em(returns, n_iter=20)
state_means,_ = kf.filter(y)
state_means = state_means.flatten()
fpr, tpr, thresholds = roc_curve(np.sign(state_means), np.sign(y.ravel()))
auc = roc_auc_score(np.sign(state_means), np.sign(y.ravel()))
plot_roc_curve(fpr, tpr, auc)
df["date"] = pd.to_datetime(df.index)
mids = (df.Open + df.Close)/2
spans = abs(df.Close-df.Open)
inc = df.Close > df.Open
dec = df.Open > df.Close
w = 12*60*60*1000 # half day in ms
prices = df[['Close']].dropna().values
# arguments are: x_0, Sigma_0, A, G, Q, R
pkf = PlainKalmanFilter(np.atleast_2d([prices[0,0]]), np.atleast_2d([0.01]), np.atleast_2d([1]),
np.atleast_2d([1]), np.atleast_2d([0.01]), np.atleast_2d([0.1]))
print(pkf)
price_state_means,_ = pkf.filter(prices.T)
price_state_means = price_state_means.flatten()
sprice_state_means,_ = pkf.smooth(prices.T)
sprice_state_means = sprice_state_means.flatten()
output_notebook()
TOOLS = "pan,wheel_zoom,box_zoom,reset,save"
p = figure(x_axis_type="datetime", tools=TOOLS, plot_width=900, toolbar_location="left", y_axis_label = "Price",
x_axis_label = "Date", y_range=(1800, 2400))
p.segment(df.date, df.High, df.date, df.Low, color="black")
p.rect(df.date[inc], mids[inc], w, spans[inc], fill_color='green', line_color="green")
p.rect(df.date[dec], mids[dec], w, spans[dec], fill_color='red', line_color="red")
p.line(df.date, price_state_means, line_width=2.2, line_color = 'blue', legend="Kalman filter")
p.line(df.date, sprice_state_means, line_width=1.8, line_color = 'cyan', legend="Kalman smoother")
p.xaxis.major_label_orientation = np.pi/4
p.grid.grid_line_alpha=0.3
show(p)
kfall = KalmanFilter(transition_matrices = [1],
observation_matrices = [1],
initial_state_mean = returns[0],
initial_state_covariance = 0.01,
observation_covariance=0.1,
transition_covariance=.01)
meansall, covariancesall = kfall.filter(returns)
window = 1
pred = []
cov_0 = 0.01
mean_0 = returns[0,0]
for i in range(window, returns.shape[0]):
obs = returns[:i]
kf = KalmanFilter(transition_matrices = [1],
observation_matrices = [1],
initial_state_mean = obs[-1],
initial_state_covariance = cov_0,
observation_covariance=0.1,
transition_covariance=.01)
means, covariances = kf.smooth(obs)
mean_0, cov_0 = means[-1], covariances[-1]
p, _ = kf.sample(1, mean_0)
pred.append(p)
pred = np.array(pred).ravel()
plt.figure(figsize=(12, 6))
plt.plot(pred.ravel(), 'r', linewidth=1.5)
plt.plot(returns[window:].ravel(), 'k--', linewidth=0.5)
plt.plot(meansall[window:].ravel(), 'b--', linewidth=1.5)
plt.ylim(-2, 2);
fig = plt.figure(figsize=plt.figaspect(0.3))
ax = fig.add_subplot(1, 1, 1)
ax.plot(df.index[window+1:], pred.ravel()-returns[window:].ravel(), 'g-', linewidth=0.8, label="sequential Kalman")
ax.plot(df.index[window+1:], state_means[window:].ravel()-returns[window:].ravel(), 'r-', linewidth=1.2, label="pykalman filter")
ax.set_ylim([-6, 8])
ax.legend(loc="upper left")
ax.set_title("Returns: Kalmans - true data")
plt.show()
# fraction of matches between signs of predicted and actial returns
hit_score = np.sum(np.sign(np.array(pred.ravel())) == np.sign(returns[window:].ravel()))/len(pred.ravel())
print("hit score:", hit_score)
fpr, tpr, thresholds = roc_curve(np.sign(np.array(pred.ravel())), np.sign(returns[window:].ravel()))
auc = roc_auc_score(np.sign(np.array(pred.ravel())), np.sign(returns[window:].ravel()))
plot_roc_curve(fpr, tpr, auc)
Assume that the hidden state $x = [b, a]$ is a line, $(b z + a)$, that observations, $y$, follow. Our initial guess for state mean and covariance are: $\hat{x}_0 = [0, 0]$ and $\Sigma_0 = 1_{2\times 2}$. We also assume that our parameters follow a random walk, therefore, transition matrix is the identity, $A=1$, and transition covariance is a small number times the identity, $Q \propto 1_{2 \times 2}$.
The observation matrix should be $G = [1_{N \times 1}, z_{N \times 1}]_{N \times 2}$, where $z_t$ is another time series vector. We assume that the variance of our observations, $R=2$. We can use observations $𝑦_t$ to evolve our estimates of the parameters $a$ and $b$ with time.
Part of this material is taken from Quantopian Lecture Series: Kalman Filters
a = 1.3
b = 3.2
N = 2000
sigma_y = 20
z = 10 * np.random.randn(N)
y = b * z + a + sigma_y * np.random.randn(N)
plt.scatter(z, y)
delta = 1e-3
trans_cov = delta / (1 - delta) * np.eye(2) # How much random walk wiggles
obs_mat = np.expand_dims(np.vstack([[z], [np.ones(len(z))]]).T, axis=1)
kf = KalmanFilter(n_dim_obs=1, n_dim_state=2, # y is 1-dimensional, (alpha, beta) is 2-dimensional
initial_state_mean=[0, 0],
initial_state_covariance=np.ones((2, 2)),
transition_matrices=np.eye(2),
observation_matrices=obs_mat,
observation_covariance=sigma_y ** 2,
transition_covariance=trans_cov)
# Use the observations y to get running estimates and errors for the state parameters
state_means, state_covs = kf.filter(y)
smoothed_state_means, smoothed_state_covs = kf.smooth(y)
tt = np.arange(N)
_, axarr = plt.subplots(2, sharex=True)
axarr[0].plot(tt, state_means[:,0], 'r', label='filtered slope')
axarr[0].plot(tt, smoothed_state_means[:,0], 'k--', label='smoothed slope')
axarr[0].legend()
axarr[1].plot(tt, state_means[:,1], 'b', label='filtered intercept')
axarr[1].plot(tt, smoothed_state_means[:,1], 'k--', label='smoothed intercept')
axarr[1].legend()
plt.tight_layout()
axarr[0].set_title("Best estimates - of a and b over time.", fontsize=22)
plt.show()
kf = KalmanFilter(transition_matrices=np.eye(2),
observation_matrices=obs_mat)
kf = kf.em(y, n_iter=40)
state_means, state_covs = kf.filter(y)
smoothed_state_means, smoothed_state_covs = kf.smooth(y)
tt = np.arange(N)
_, axarr = plt.subplots(2, sharex=True)
axarr[0].plot(tt, state_means[:,0], 'r', label='filtered slope')
axarr[0].plot(tt, smoothed_state_means[:,0], 'k--', label='smoothed slope')
axarr[0].legend()
axarr[1].plot(tt, state_means[:,1], 'b', label='filtered intercept')
axarr[1].plot(tt, smoothed_state_means[:,1], 'k--', label='smoothed intercept')
axarr[1].legend()
plt.tight_layout()
axarr[0].set_title("Best estimates - of a and b over time.", fontsize=22)
plt.show()
notebook_file = r"TS1_Kalman_Filter.ipynb"
html_converter(notebook_file)